package org.databandtech.flink.sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ESSinkUtil {
	

	public static <T> void addSinkBySingleOutputStreamOperator(
			List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
			SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func, ParameterTool parameterTool) {

		ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
		esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
		esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());

		data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
	}

	public static <T> void addSinkBySingleOutputStreamOperator(
			List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
			SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {

		ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
		esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
		esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());

		data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
	}
	
	public static void addSinkByDataStreamSource(
			List<HttpHost> httpHosts, int bulkFlushMaxActions , DataStreamSource<Tuple2<String, Integer>> userDataStreamSource,String index) {
		
		ElasticsearchSink.Builder<Tuple2<String,Integer>> esSinkBuilder = new ElasticsearchSink.Builder<>(
			    httpHosts,
			    new ElasticsearchSinkFunction<Tuple2<String,Integer>>() {

					private static final long serialVersionUID = 1L;

					public IndexRequest createIndexRequest(Tuple2<String,Integer> element) {
			            Map<String, Object> json = new HashMap<>();
			            json.put("username", element.f0);
			            json.put("age", element.f1);

			            return Requests.indexRequest()
			                    .index(index)
			                    .source(json);
			        }

			        @Override
			        public void process(Tuple2<String,Integer> element, RuntimeContext ctx, RequestIndexer indexer) {
			            indexer.add(createIndexRequest(element));
			        }
			    }
			);
		esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
		userDataStreamSource.addSink(esSinkBuilder.build());
	}

	public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
		String[] hostList = hosts.split(",");
		List<HttpHost> addresses = new ArrayList<>();
		for (String host : hostList) {
			if (host.startsWith("http")) {
				URL url = new URL(host);
				addresses.add(new HttpHost(url.getHost(), url.getPort()));
			} else {
				String[] parts = host.split(":", 2);
				if (parts.length > 1) {
					addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
				} else {
					throw new MalformedURLException("invalid elasticsearch hosts format");
				}
			}
		}
		return addresses;
	}



}
